package com.gbase8c.dmt.migration;

import com.gbase8c.dmt.common.exception.MigrationException;
import com.gbase8c.dmt.config.DmtConfig;
import com.gbase8c.dmt.dao.*;
import com.gbase8c.dmt.model.enums.MigrationObjectType;
import com.gbase8c.dmt.model.migration.config.Task;
import com.gbase8c.dmt.model.migration.dto.DataSourceDto;
import com.gbase8c.dmt.model.migration.dto.DboDto;
import com.gbase8c.dmt.model.migration.dto.MigrationObject;
import com.gbase8c.dmt.model.migration.record.ExecuteRecord;
import com.gbase8c.dmt.model.migration.record.MigrationRecordDto;
import com.gbase8c.dmt.model.enums.MigrationTaskStatus;
import com.gbase8c.dmt.service.DataSourceFactory;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.ObjectUtils;

import javax.sql.DataSource;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;

@Slf4j
public class MigrationExecutionService {

    private DataSourceDao dataSourceDao;
    private SnapshotDao snapshotDao;
    private DboDao dboDao;
    private DmtConfig dmtConfig;
    private RecordDao recordDao;
    private TaskDao taskDao;
    private MigrationObjectService migrationObjectService;

    public MigrationExecutionService(DmtConfig dmtConfig,
                                     DataSourceDao dataSourceDao,
                                     SnapshotDao snapshotDao,
                                     DboDao dboDao,
                                     TaskDao taskDao, RecordDao recordDao) {
        this.dmtConfig = dmtConfig;
        this.dataSourceDao = dataSourceDao;
        this.snapshotDao = snapshotDao;
        this.dboDao = dboDao;
        this.taskDao = taskDao;
        this.recordDao = recordDao;
        this.migrationObjectService = new MigrationObjectService(dmtConfig, dataSourceDao, snapshotDao);
    }

    private static Map<String, MigrationThread> _threadTable = Maps.newHashMap();

    public static Map<String, MigrationThread> getThreadTable(){
        return _threadTable;
    }

    public void runMigrationExecution(String taskId){
        MigrationThread migrationThread = _threadTable.get(taskId);
        DboDto dboDto = dboDao.get(taskId);
        if (ObjectUtils.isEmpty(dboDto)){
            throw new MigrationException("cannot find the converted dboDto which taskId is "+"taskId");
        }
        String tar = dboDto.getTask().getTar();
        DataSourceDto dataSourceDto = dataSourceDao.get(tar);
        DataSource dataSource = DataSourceFactory.getDataSource(dataSourceDto);
        Map<MigrationObjectType, List<? extends MigrationObject>> orderedDbObjects = migrationObjectService.getMigrationObjects(dboDto);
        if (migrationThread ==null) {
            migrationThread = MigrationThread.build(dboDto, orderedDbObjects, dataSource,recordDao, taskDao);
            migrationThread.init(dmtConfig);
            _threadTable.put(taskId, migrationThread);
            log.info("create [ReplicationThread] mission : "+taskId);
        } else if(!migrationThread.isAlive()) {
            migrationThread = MigrationThread.build(dboDto, orderedDbObjects, dataSource,recordDao, taskDao);
            migrationThread.init(dmtConfig);
            _threadTable.put(taskId, migrationThread);
            log.info("rebuild [ReplicationThread] mission : "+taskId);
        }
        if (migrationThread.getMigrationTaskStatus().equals(MigrationTaskStatus.RUNNING)
                && migrationThread.isAlive()
                && !migrationThread.isStop()) {
            log.info("mission is already running ... "+taskId);
        } else {
            migrationThread.start();
        }
    }

    public void runMigrationExecution(DboDto dboDto){
        String taskId = dboDto.getTask().getId();
        MigrationThread migrationThread = _threadTable.get(taskId);
        if (ObjectUtils.isEmpty(dboDto)){
            throw new MigrationException("cannot find the converted dboDto which taskId is "+"taskId");
        }
        String tar = dboDto.getTask().getTar();
        DataSourceDto dataSourceDto = dataSourceDao.get(tar);
        DataSource dataSource = DataSourceFactory.getDataSource(dataSourceDto);
        Map<MigrationObjectType, List<? extends MigrationObject>> orderedDbObjects = migrationObjectService.getMigrationObjects(dboDto);
        if (migrationThread ==null) {
            migrationThread = MigrationThread.build(dboDto, orderedDbObjects, dataSource,recordDao, taskDao);
            migrationThread.init(dmtConfig);
            _threadTable.put(taskId, migrationThread);
            log.info("create [ReplicationThread] mission : "+taskId);
        } else if(!migrationThread.isAlive()) {
            migrationThread = MigrationThread.build(dboDto, orderedDbObjects, dataSource,recordDao, taskDao);
            migrationThread.init(dmtConfig);
            _threadTable.put(taskId, migrationThread);
            log.info("rebuild [ReplicationThread] mission : "+taskId);
        }
        if (migrationThread.getMigrationTaskStatus().equals(MigrationTaskStatus.RUNNING)
                && migrationThread.isAlive()
                && !migrationThread.isStop()) {
            log.info("mission is already running ... "+taskId);
        } else {
            migrationThread.start();
        }
    }

    public MigrationTaskStatus getStatus(String taskId) {
        MigrationTaskStatus taskStatus = MigrationObjectService.getTaskStatus(taskId);
        if (taskStatus == null) {
            Task task = taskDao.get(taskId);
            taskStatus = task.getStatus();
            // 如果缓存里面没有，任务里面的状态却是各种进行中的状态， 说明dmt已经被重启过
            // 设置任务状态为失败并返回失败状态
            MigrationThread migrationThread = _threadTable.get(taskId);
            if (migrationThread == null) {
                if (MigrationTaskStatus.COLLECTING == taskStatus
                        || MigrationTaskStatus.CONVERTING == taskStatus
                        || MigrationTaskStatus.RUNNING == taskStatus) {
                    taskStatus = MigrationTaskStatus.FAILED;
                    task.setStatus(MigrationTaskStatus.FAILED);
                    taskDao.update(task);
                }
            }
            if (MigrationTaskStatus.PRECHECKING == taskStatus) {
                taskStatus = MigrationTaskStatus.PRECHECKED_FAILED;
                task.setStatus(MigrationTaskStatus.FAILED);
                taskDao.update(task);
            }
        }
        return taskStatus;
    }

    private MigrationTaskStatus getTaskStatusByMigrationRecord(MigrationRecordDto recordDto){
        MigrationTaskStatus migrationTaskStatus = null;
            List<ExecuteRecord> records = new ArrayList<>();
            if (MapUtils.isEmpty(recordDto.getSchemaRecordsMap())){
                migrationTaskStatus = MigrationTaskStatus.UNSTART;
            } else {
                recordDto.getSchemaRecordsMap().values().forEach(records::addAll);
                int sum = records.stream().mapToInt(ExecuteRecord::getFailed).sum();
                if (sum > 0){
                    migrationTaskStatus = MigrationTaskStatus.FAILED;
                } else {
                    migrationTaskStatus = MigrationTaskStatus.SUCCESS;
                }
            }
        return migrationTaskStatus;
    }

    public MigrationRecordDto getRecordById(String taskId){
        MigrationRecordDto recordDto = null;
        MigrationThread migrationThread = _threadTable.get(taskId);
        if ( null == migrationThread){
            List<MigrationRecordDto> list = recordDao.list(taskId);
            if (!CollectionUtils.isEmpty(list)){
                recordDto = list.stream().max(Comparator.comparing(MigrationRecordDto::getEndTime)).get();
            }
        } else {
            recordDto = migrationThread.getRunningRecord();
        }
        return recordDto;
    }


}
